Parallel Computing with Python¶






Master for Smart Data Science

Parallel Computing with Python



Alessio Crisafulli Carpani

Adrien Chaillout-Morlot

The purpose of this project is to implement in Python a procedure for selecting the variables in the logit model. The procedure for selecting the variables is a stepwise search which optimizes the prediction error estimated by cross-validation.


Table of Contents

  • 1  Parallel Computing with Python
  • 2  Logit Data and Model Fitting
  • 3  Cross-Validation
    • 3.1  Parallel CV using Dask Delayed
    • 3.2  Parallel CV using Concurrent Futures
    • 3.3  Strong-Scaling curves
  • 4  Model Selection
    • 4.1  Forward Regression
      • 4.1.1  Forward Regression with CV
      • 4.1.2  Parallel Forward Regression with CV

Logit Data and Model Fitting¶

Let $\left(X_{1}^{\top}, Y_{1}\right), \ldots,\left(X_{n}^{\top}, Y_{n}\right)$ be observed independent copies of the random vector $$ \left(X^{\top}, Y\right) \text { with } \mathbf{X}=\left[\begin{array}{ccccc} 1 & x_{1,1} & x_{1,2} & \ldots & x_{1, p} \\ 1 & x_{2,1} & x_{2,2} & \ldots & x_{2, p} \\ \ldots & & & & \\ 1 & x_{n, 1} & x_{n, 2} & \ldots & x_{n, p} \end{array}\right] \in \mathbb{R}^{n \times(p+1)} \text { and } Y \in\{0,1\} . $$

The distribution of $Y$ given $X=x$ is assumed to be a logit model, such that

$$ \mathbb{P}(Y=1 \mid X=x)=\frac{e^{x^{\top} \beta}}{1+e^{x^{\top} \beta}} \text { and } \mathbb{P}(Y=0 \mid X=x)=1-\mathbb{P}(Y=1 \mid X=x), $$
In [ ]:
# Setup ----------------------
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
import pandas as pd
import time
import tqdm
%precision %.5f
import warnings
warnings.filterwarnings("ignore")

import dask
from dask import delayed
from dask.distributed import Client
import concurrent.futures

import plotly.graph_objects as go
from plotly.subplots import make_subplots

Here a function for generating data is defined and a logistic regression model will be fit with statsmodels

In [ ]:
# Define data generation function 
n = 1000
def logit(x, beta):
    l = x.dot(beta)
    return 1/(1+np.exp(-l))

# Generate data from logit distribution
rng = np.random.RandomState(1234)
x1, x2 = rng.normal(size=n), rng.normal(size=n)
X = sm.add_constant(np.c_[x1, x2]) # add ones to the first column for bias
beta = [1., 2., -1.] # Choose some arbitrary coefficients
p = logit(X, beta)
y = np.random.binomial(1., p) # generate the data

#Plot Data 
plt.axes().set_aspect("equal")
plt.scatter(x1, x2, c = y, marker = ".")

# Fit Logistic regression
model = sm.Logit(y, X).fit()
print(model.summary())
ypred = np.round(model.predict(X))
print(f'Test accuracy = {np.sum(y == ypred)/ y.size}')
Optimization terminated successfully.
         Current function value: 0.439218
         Iterations 7
                           Logit Regression Results                           
==============================================================================
Dep. Variable:                      y   No. Observations:                 1000
Model:                          Logit   Df Residuals:                      997
Method:                           MLE   Df Model:                            2
Date:                Wed, 19 Oct 2022   Pseudo R-squ.:                  0.3307
Time:                        23:08:20   Log-Likelihood:                -439.22
converged:                       True   LL-Null:                       -656.24
Covariance Type:            nonrobust   LLR p-value:                 5.602e-95
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
const          0.9034      0.091      9.931      0.000       0.725       1.082
x1             1.8763      0.130     14.456      0.000       1.622       2.131
x2            -0.8096      0.093     -8.708      0.000      -0.992      -0.627
==============================================================================
Test accuracy = 0.792

Cross-Validation¶

The following function which takes as input argument the sample. This function returns the estimator of the error of prediction obtained by cross-validation using LeaveOnOut strategy.

Here first we define a more complex set of data, in order to better show the results of the parallel implementation. We have observed that using parallel computation methods is useless for small tasks.

In [ ]:
rng = np.random.RandomState(1234)
n = 1000
beta = [0, 1, 0, -2, 3, 0, 6, -1, 0, 0]
rng = np.random.RandomState(1234)
X = rng.randn(n*len(beta)).reshape(n, len(beta))
p = logit(X, beta)
y = np.random.binomial(1., p)
In [ ]:
%%time
def cv(X, y):
    nobs = np.size(X,0)
    score = 0
    indices = np.arange(nobs)
    for i in range(nobs):
        sapp = indices != i
        sval = i
        model = sm.Logit(y[sapp], X[sapp,:]).fit(disp=False)
        ypred = np.round(model.predict(X[sval, :]))
        score += y[sval] == ypred
    return (score / nobs)[0]
cv(X, y)
CPU times: user 1min 1s, sys: 1min 17s, total: 2min 18s
Wall time: 12.4 s
Out[ ]:
0.92200

Parallel CV using Dask Delayed¶

In [ ]:
%%time
client = Client(threads_per_worker=2, n_workers=4)

result1 = delayed(cv)(X,y)
print(result1.compute())
client.close()
result1.visualize()
0.922
CPU times: user 448 ms, sys: 350 ms, total: 798 ms
Wall time: 4.39 s
Out[ ]:

Parallelizing the CV function using dask.delayed reduced the computational time by its half using 4 cores. Let's see what happens with 8:

In [ ]:
%%time
client = Client(threads_per_worker=4, n_workers=8)
result2 = delayed(cv)(X,y)
print(result2.compute())
client.close()
0.922
CPU times: user 360 ms, sys: 143 ms, total: 503 ms
Wall time: 4.68 s

Increasing the numbers of workers does not always imply improving results. It should be proportional to the workload.

Test of Dask delayed by increasing the set size (20,2000)¶

In [ ]:
def generate_X_y(num_var:int=20,num_obs:int=1000)->np.ndarray:
    """This function generate random set with nvar and nobs

    Args:
        num_var (int, optional): num of var. Defaults to 20.
        num_obs (int, optional): num of obs. Defaults to 1000.

    Returns:
        np.ndarray: X and y
    """
    X = sm.add_constant(np.c_[np.random.normal(size=(num_obs, num_var-1))])
    beta=np.random.normal(size=num_var)
    p = logit(X, beta)
    y = np.random.binomial(1., p).reshape(num_obs,1)
    return X,y
In [ ]:
X_large,y_large=generate_X_y(20,2000)
In [ ]:
%%time
cv(X_large, y_large)
CPU times: user 3min 13s, sys: 3min 55s, total: 7min 9s
Wall time: 37.2 s
Out[ ]:
0.86000

Dask improve speed by 4 when the set size increase

In [ ]:
%%time
client = Client(threads_per_worker=2, n_workers=4)

result1 = delayed(cv)(X_large,y_large)
print(result1.compute())
client.close()
result1.visualize()
0.86
CPU times: user 561 ms, sys: 50.5 ms, total: 612 ms
Wall time: 12.3 s
Out[ ]:

Parallel CV using Concurrent Futures¶

In [ ]:
def cv_thread_process(X:np.ndarray, y:np.ndarray,worker:int,Thread:bool=True)->float:  
    """this function parallelise the cv with Thread or process from concurrent futures 

    Args:
        X (np.ndarray): X matrix
        y (np.ndarray): y vector
        worker (int): number of workers
        Thread (bool, optional): bool=Thread else process. Defaults to True.

    Returns:
        float:return the accuracy
    """
    def cv_iter(i,X,y):
        indices = np.arange(np.size(X,0))
        sapp = indices != i
        sval = i
        model = sm.Logit(y[sapp], X[sapp,:]).fit(disp=False)
        ypred = np.round(model.predict(X[sval, :]))
        score = y[sval] == ypred
        return score
    nobs = np.size(X,0)
    if Thread==True:
        with concurrent.futures.ThreadPoolExecutor(max_workers=worker) as executor:
            results=executor.map(cv_iter, range(1,nobs), [X]*nobs,[y]*nobs)
            executor.shutdown()
    else:
        with concurrent.futures.ThreadPoolExecutor(max_workers=worker) as executor:
            results=map(cv_iter, range(1,nobs), [X]*nobs,[y]*nobs)
            executor.shutdown()
            
    return sum(results)[0]/nobs
In [ ]:
%%time
cv_thread_process(X,y, 12, Thread=True)
CPU times: user 53.1 s, sys: 1min 7s, total: 2min
Wall time: 10.6 s
Out[ ]:
0.92100
In [ ]:
%%time
cv_thread_process(X,y, 12, Thread=False)
CPU times: user 1min 10s, sys: 1min 29s, total: 2min 39s
Wall time: 13.8 s
Out[ ]:
0.92100

Thread doesn't improve and process is worst than normal cv. Concurent futures isn't robust in that case

Strong-Scaling curves¶

To provide quantitative data on the parallel performance of our application, we are going to measure our parallel scaling with Strong Scaling: time for fixed problem size as number of processor is increased

In [ ]:
class StrongScaling:
    """This class generate strong scaling curve"""

    def __init__(self,core_range:list=list(range(1,17))):
        self.core_range=core_range
    
    def strong_scaling(self,X:np.ndarray,y:np.ndarray,type_Thread:bool)->list:
        """This function compute de strong scaling curve for an array X and y
        Args:
            X (np.ndarray): matrix X
            y (np.ndarray): vector y
            type_Thread (bool): True if using Thread False for process

        Returns:
            list: historic of time in function of core in thread or processus
        """
        time_historic=[]
        if type_Thread:
            for i in self.core_range:
                start=time.time()
                cv_thread_process(X, y,i,Thread=True)
                stop=time.time()
                time_historic.append(stop-start)
        else:
            for i in self.core_range:
                start=time.time()
                cv_thread_process(X, y,i,Thread=False)
                stop=time.time()
                time_historic.append(stop-start)
        return time_historic
    
    def strong_scaling_multiple(self,list_var:list,nobs:int)->dict:
        """ this function compute the strong scaling curve for many X and y
        Args:
            list_var (list): list of number of variables
            nobs (int):  number of obs for each set

        Returns:
            dict,dict: dict of time for thread and process 
        """
        
        dictionnary_thread_time={}
        dictionnary_process_time={}
        for var in tqdm.tqdm(list_var):
            X,y=generate_X_y(var,nobs)
            dictionnary_thread_time["variables "+str(var)]=self.strong_scaling(X,y,True)
            dictionnary_process_time["variables "+str(var)]=self.strong_scaling(X,y,False)
        return dictionnary_thread_time,dictionnary_process_time
In [ ]:
dictionnary_thread_time,dictionnary_process_time=StrongScaling().strong_scaling_multiple([10,20,50],1000)
100%|██████████| 3/3 [1:25:18<00:00, 1706.11s/it]
In [ ]:
thread_curves=pd.DataFrame(dictionnary_thread_time,index=range(1,17))
process_curves=pd.DataFrame(dictionnary_process_time,index=range(1,17))
In [ ]:
process_curves
Out[ ]:
variables 10 variables 20 variables 50
1 9.611544 13.168691 83.590250
2 9.242544 12.605175 86.540803
3 9.499396 13.676839 83.119842
4 9.979800 13.934739 83.091265
5 10.638895 13.588097 81.601782
6 7.866486 13.565158 79.976513
7 8.278714 12.855550 78.525045
8 8.465188 10.759225 85.075973
9 8.564870 13.046129 84.761789
10 8.167425 14.140499 78.315940
11 9.260344 12.608719 85.603778
12 8.612048 12.847224 83.710822
13 8.846300 13.996606 84.438727
14 9.309823 13.365880 81.801426
15 8.504881 14.180995 79.193812
16 8.817877 14.696520 78.187195
In [ ]:
thread_curves
Out[ ]:
variables 10 variables 20 variables 50
1 9.334562 11.107100 76.253473
2 7.210241 10.333477 83.459504
3 7.351614 14.607921 112.085886
4 7.499275 20.258789 144.727232
5 7.835736 17.612512 166.841285
6 8.250704 21.094695 188.335552
7 8.268324 20.116830 197.721071
8 8.327990 16.965829 212.924021
9 8.362028 19.255391 216.905652
10 8.297633 16.294657 223.401685
11 8.522640 21.603396 224.033805
12 9.067732 15.333988 233.846651
13 8.251261 19.150719 233.832960
14 8.867811 20.318651 226.862237
15 9.855490 23.283908 239.062054
16 10.099325 20.082687 240.963753
In [ ]:
fig = make_subplots(rows=3, cols=1,subplot_titles=('Run time for 10 variables',  'Run time for 20 variables','Run time for 50 variables'),x_title="Number of workers",
                    y_title="Run time in seconds")

fig.append_trace(go.Scatter(
    x=process_curves.index,
    y=process_curves[process_curves.columns[0]],name=process_curves.columns[0],line_shape='spline'
), row=1, col=1)

fig.append_trace(go.Scatter(
    x=process_curves.index,
    y=process_curves[process_curves.columns[1]],name=process_curves.columns[1],line_shape='spline'
), row=2, col=1)

fig.append_trace(go.Scatter(
    x=process_curves.index,
    y=process_curves[process_curves.columns[2]],name=process_curves.columns[2],line_shape='spline'
), row=3, col=1)

fig.update_layout(height=800, width=800, title_text="Run time Process",title_x=0.5,legend=dict(y=0.5))
fig.show(renderer='notebook')
In [ ]:
fig = make_subplots(rows=3, cols=1,subplot_titles=('Run time for 10 variables',  'Run time for 20 variables','Run time for 50 variables'),x_title="Number of workers",
                    y_title="Run time in seconds")

fig.append_trace(go.Scatter(
    x=thread_curves.index,
    y=thread_curves[thread_curves.columns[0]],name=thread_curves.columns[0],line_shape='spline'
), row=1, col=1)

fig.append_trace(go.Scatter(
    x=thread_curves.index,
    y=thread_curves[thread_curves.columns[1]],name=thread_curves.columns[1],line_shape='spline'
), row=2, col=1)

fig.append_trace(go.Scatter(
    x=thread_curves.index,
    y=thread_curves[thread_curves.columns[2]],name=thread_curves.columns[2],line_shape='spline'
), row=3, col=1)

fig.update_layout(height=800, width=800, title_text="Run time Thread",title_x=0.5,legend=dict(y=0.5))
fig.show(renderer='notebook')
In [ ]:
speed_up_thread=thread_curves.iloc[0,:]/thread_curves
speed_up_process=process_curves.iloc[0,:]/process_curves
In [ ]:
fig = make_subplots(rows=3, cols=1,subplot_titles=('Speed up for 10 variables',  'Speed up for 20 variables','Speed up for 50 variables'),x_title="Number of workers",
                    y_title="Speed up")

fig.append_trace(go.Scatter(
    x=speed_up_process.index,
    y=speed_up_process[speed_up_process.columns[0]],name=speed_up_process.columns[0],line_shape='spline'
), row=1, col=1)

fig.append_trace(go.Scatter(
    x=speed_up_process.index,
    y=speed_up_process[speed_up_process.columns[1]],name=speed_up_process.columns[1],line_shape='spline'
), row=2, col=1)

fig.append_trace(go.Scatter(
    x=speed_up_process.index,
    y=speed_up_process[speed_up_process.columns[2]],name=speed_up_process.columns[2],line_shape='spline'
), row=3, col=1)

fig.update_layout(height=800, width=800, title_text="Strong scaling Process",title_x=0.5,legend=dict(y=0.5))
fig.show(renderer='notebook')
In [ ]:
fig = make_subplots(rows=3, cols=1,subplot_titles=('Speed up for 10 variables',  'Speed up for 20 variables','Speed up for 50 variables'),x_title="Number of workers",
                    y_title="Speed up")

fig.append_trace(go.Scatter(
    x=speed_up_thread.index,
    y=speed_up_thread[speed_up_thread.columns[0]],name=speed_up_thread.columns[0],line_shape='spline'
), row=1, col=1)

fig.append_trace(go.Scatter(
    x=speed_up_thread.index,
    y=speed_up_thread[speed_up_thread.columns[1]],name=speed_up_thread.columns[1],line_shape='spline'
), row=2, col=1)

fig.append_trace(go.Scatter(
    x=speed_up_thread.index,
    y=speed_up_thread[speed_up_thread.columns[2]],name=speed_up_thread.columns[2],line_shape='spline'
), row=3, col=1)

fig.update_layout(height=800, width=800, title_text="Strong scaling Thread",title_x=0.5,legend=dict(y=0.5))
fig.show(renderer='notebook')

Results of Run time aren't robust sometime it improves sometime not. In that case concurent.futures isn't the best way to increase speed time.

Process could be better than thread when set is larger

Model Selection¶

The strategy we have used in this project was the Forward Regression method, which we have implemented using cross validation.

Forward Regression¶

This is the traditional approach using p-values, just to check consistency.

In [ ]:
%%time
rng = np.random.RandomState(1234)
import statsmodels.api as sm
def forward_regression(X, y, threshold_in = 0.01):
    included = []
    nvars = np.size(X, 1)
    while True:
        changed=False
        excluded = list(set(range(nvars))-set(included))
        new_pval = np.full(nvars, np.inf)
        for new_column in excluded:
            test = included+[new_column]
            X1 = sm.add_constant(X[:,test])
            model = sm.Logit(y, X1).fit(disp=False)
            new_pval[new_column] = model.pvalues[-1]
        best_pval = new_pval.min()
        if best_pval < threshold_in:
            best_feature = np.argmin(new_pval)
            included.append(best_feature)                
            changed=True
        if not changed:
            return included

forward_regression(X, y)
CPU times: user 330 ms, sys: 198 ms, total: 528 ms
Wall time: 116 ms
Out[ ]:
[6, 4, 3, 1, 7]

Forward Regression with CV¶

This is our approach. Here the functions iterate among all the possible models by adding a new variable each time and computing the error rate of each model. Then among all the error rate computed, a variable is selected if it corresponds to the minimum one. After then, the iterations continues including the previously selected variable and so on, until when adding another variable does not minimize the least error rate computed in the previous models.

In [ ]:
%%time
rng = np.random.RandomState(1234)

def forward_regression_cv(X:np.ndarray, y:np.ndarray)->list:
    """Function forward selection with cross val

    Args:
        X (np.ndarray): X matrix
        y (np.ndarray): y vector

    Returns:
        list: list of variables
    """
    included = []
    nvars = np.size(X, 1)
    accuracies = []
    while True:
        changed=False
        print(f'Computing CV with subset {included} ...')
        excluded = list(set(range(nvars))-set(included))
        new_acc = np.full(nvars, np.inf)
        for new_column in excluded:
            test = included+[new_column]
            X1 = sm.add_constant(X[:,test])
            new_acc[new_column] = 1-cv(X1,y) #error rate
        best_acc = new_acc.min()
        accuracies.append(new_acc.min())
        # We need to tell the algorithm when to stop including other variables
        # So, after including half of the variables, the best accuracy should be met soon
        # Therefore stop including variables if they do not improve best accuracy recorded yet
        if len(accuracies) > nvars/2:
            if best_acc < min(new_acc):
                best_feature = np.argmin(new_acc)
                included.append(best_feature)        
                changed=True
            if not changed:
                acc = 1-new_acc.min()
                return print(f'Including other variables after {included}, does not improve accuracy of {acc*100}%')
        else:
            if best_acc < np.mean(new_acc):
                best_feature = np.argmin(new_acc)
                included.append(best_feature)   
                changed=True
            if not changed:
                return included
    

forward_regression_cv(X, y)
Computing CV with subset [] ...
Computing CV with subset [6] ...
Computing CV with subset [6, 4] ...
Computing CV with subset [6, 4, 3] ...
Computing CV with subset [6, 4, 3, 7] ...
Computing CV with subset [6, 4, 3, 7, 1] ...
Including other variables after [6, 4, 3, 7, 1], does not improve accuracy of 92.60000000000001%
CPU times: user 5min 12s, sys: 3min 38s, total: 8min 51s
Wall time: 1min 45s

Parallel Forward Regression with CV with dask¶

In [ ]:
%%time
client = Client(threads_per_worker=2, n_workers=4)
print(delayed(forward_regression_cv)(X, y).compute())
client.close()
Computing CV with subset [] ...
Computing CV with subset [6] ...
Computing CV with subset [6, 4] ...
Computing CV with subset [6, 4, 3] ...
Computing CV with subset [6, 4, 3, 7] ...
Computing CV with subset [6, 4, 3, 7, 1] ...
Including other variables after [6, 4, 3, 7, 1], does not improve accuracy of 92.60000000000001%
None
CPU times: user 1.83 s, sys: 1.03 s, total: 2.86 s
Wall time: 1min 21s

Parallel Forward Regression with CV with Concurrent.futures¶

In [ ]:
def forward_regression_cv_concurrent(X:np.ndarray, y:np.ndarray,workers:int,bool_thread:bool)->list:
    """Function forward selection with cross val

    Args:
        X (np.ndarray): X matrix
        y (np.ndarray): y vector
        workers (int): num of workers
        bool_thread(bool): bool to determine thread or process
    Returns:
        list: list of variables
    """
    included = []
    nvars = np.size(X, 1)
    accuracies = []
    while True:
        changed=False
        print(f'Computing CV with subset {included} ...')
        excluded = list(set(range(nvars))-set(included))
        new_acc = np.full(nvars, np.inf)
        for new_column in excluded:
            test = included+[new_column]
            X1 = sm.add_constant(X[:,test])
            new_acc[new_column] = 1-cv_thread_process(X1,y,worker=workers,Thread=bool_thread) #error rate
        best_acc = new_acc.min()
        accuracies.append(new_acc.min())
        # We need to tell the algorithm when to stop including other variables
        # So, after including half of the variables, the best accuracy should be met soon
        # Therefore stop including variables if they do not improve best accuracy recorded yet
        if len(accuracies) > nvars/2:
            if best_acc < min(new_acc):
                best_feature = np.argmin(new_acc)
                included.append(best_feature)        
                changed=True
            if not changed:
                acc = 1-new_acc.min()
                return print(f'Including other variables after {included}, does not improve accuracy of {acc*100}%')
        else:
            if best_acc < np.mean(new_acc):
                best_feature = np.argmin(new_acc)
                included.append(best_feature)   
                changed=True
            if not changed:
                return included
    
In [ ]:
%%time
forward_regression_cv_concurrent(X, y,12,True) #with Thread
Computing CV with subset [] ...
Computing CV with subset [6] ...
Computing CV with subset [6, 4] ...
Computing CV with subset [6, 4, 3] ...
Computing CV with subset [6, 4, 3, 7] ...
Computing CV with subset [6, 4, 3, 7, 1] ...
Including other variables after [6, 4, 3, 7, 1], does not improve accuracy of 92.5%
CPU times: user 11min 21s, sys: 8min 49s, total: 20min 11s
Wall time: 3min 41s
In [ ]:
%%time
forward_regression_cv_concurrent(X, y,12,False) #with process
Computing CV with subset [] ...
Computing CV with subset [6] ...
Computing CV with subset [6, 4] ...
Computing CV with subset [6, 4, 3] ...
Computing CV with subset [6, 4, 3, 7] ...
Computing CV with subset [6, 4, 3, 7, 1] ...
Including other variables after [6, 4, 3, 7, 1], does not improve accuracy of 92.5%
CPU times: user 4min 8s, sys: 2min 51s, total: 7min
Wall time: 1min 29s

Process is more efficient than thread even in model selection

Conclusion¶

Parallelisation with dask is easier to implement and give better results than concurent.futures. Moreover, dask delayed is more robust than concurent.futures Thread and process.
With the strong scaling curves we clearly observe that paralelisation with thread and process isn't relevant and comport an important part of randomness.
For huge dataset Process seems more efficient than Thread regarding the Strong scaling curves. Sometime with huge datasets we can observe a few improvement but it's not really robust..
Delayed has improved the speed of our code by 3 or 4.